package pointers_test

import (
	"bytes"
	"context"
	"errors"
	"io"
	"testing"

	"github.com/apache/arrow-go/v18/arrow"
	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/apache/arrow-go/v18/arrow/scalar"
	"github.com/stretchr/testify/require"

	"github.com/grafana/loki/v3/pkg/dataobj"
	"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
	"github.com/grafana/loki/v3/pkg/util/arrowtest"
)

// TestReader does a basic end-to-end test over a reader with a predicate applied.
func TestReader(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072},
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col1", ColumnIndex: 0, ValuesBloomFilter: []byte{1, 2, 3}},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col2", ColumnIndex: 1, ValuesBloomFilter: []byte{4, 5, 6}},
	})

	var (
		pathCol         = sec.Columns()[0]
		sectionCol      = sec.Columns()[1]
		pointerKindCol  = sec.Columns()[2]
		streamIDCol     = sec.Columns()[3]
		streamIDRefCol  = sec.Columns()[4]
		minTimestampCol = sec.Columns()[5]
		maxTimestampCol = sec.Columns()[6]
		rowCountCol     = sec.Columns()[7]
		uncompressedCol = sec.Columns()[8]
		columnNameCol   = sec.Columns()[9]
		columnIndexCol  = sec.Columns()[10]
		valuesBloomCol  = sec.Columns()[11]
	)

	require.Equal(t, "path", pathCol.Name)
	require.Equal(t, pointers.ColumnTypePath, pathCol.Type)
	require.Equal(t, "", sectionCol.Name)
	require.Equal(t, pointers.ColumnTypeSection, sectionCol.Type)
	require.Equal(t, "", pointerKindCol.Name)
	require.Equal(t, pointers.ColumnTypePointerKind, pointerKindCol.Type)
	require.Equal(t, "", streamIDCol.Name)
	require.Equal(t, pointers.ColumnTypeStreamID, streamIDCol.Type)
	require.Equal(t, "", streamIDRefCol.Name)
	require.Equal(t, pointers.ColumnTypeStreamIDRef, streamIDRefCol.Type)
	require.Equal(t, "", minTimestampCol.Name)
	require.Equal(t, pointers.ColumnTypeMinTimestamp, minTimestampCol.Type)
	require.Equal(t, "", maxTimestampCol.Name)
	require.Equal(t, pointers.ColumnTypeMaxTimestamp, maxTimestampCol.Type)
	require.Equal(t, "", rowCountCol.Name)
	require.Equal(t, pointers.ColumnTypeRowCount, rowCountCol.Type)
	require.Equal(t, "", uncompressedCol.Name)
	require.Equal(t, pointers.ColumnTypeUncompressedSize, uncompressedCol.Type)
	require.Equal(t, "column_name", columnNameCol.Name)
	require.Equal(t, pointers.ColumnTypeColumnName, columnNameCol.Type)
	require.Equal(t, "", columnIndexCol.Name)
	require.Equal(t, pointers.ColumnTypeColumnIndex, columnIndexCol.Type)
	require.Equal(t, "values_bloom_filter", valuesBloomCol.Name)
	require.Equal(t, pointers.ColumnTypeValuesBloomFilter, valuesBloomCol.Type)

	for _, tt := range []struct {
		name     string
		columns  []*pointers.Column
		expected arrowtest.Rows
	}{
		{
			name:    "basic reads with predicate",
			columns: []*pointers.Column{pathCol, sectionCol, pointerKindCol, streamIDCol, streamIDRefCol},
			expected: arrowtest.Rows{
				{"path.path.utf8": "path1", "section.int64": int64(1), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), "stream_id.int64": int64(10), "stream_id_ref.int64": int64(100)},
				{"path.path.utf8": "path2", "section.int64": int64(2), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), "stream_id.int64": int64(20), "stream_id_ref.int64": int64(200)},
			},
		},
		// tests that the reader evaluates predicates correctly even when only some columns are selected for output
		{
			name:    "reads with subset of columns",
			columns: []*pointers.Column{pathCol, sectionCol, pointerKindCol, streamIDCol},
			expected: arrowtest.Rows{
				{"path.path.utf8": "path1", "section.int64": int64(1), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), "stream_id.int64": int64(10)},
				{"path.path.utf8": "path2", "section.int64": int64(2), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), "stream_id.int64": int64(20)},
			},
		},
		// tests reading all columns
		{
			name: "read all columns for stream pointers",
			columns: []*pointers.Column{
				pathCol, sectionCol, pointerKindCol, streamIDCol, streamIDRefCol,
				minTimestampCol, maxTimestampCol, rowCountCol, uncompressedCol,
			},
			expected: arrowtest.Rows{
				{
					"path.path.utf8":          "path1",
					"section.int64":           int64(1),
					"pointer_kind.int64":      int64(pointers.PointerKindStreamIndex),
					"stream_id.int64":         int64(10),
					"stream_id_ref.int64":     int64(100),
					"min_timestamp.timestamp": unixTime(10).UTC(),
					"max_timestamp.timestamp": unixTime(20).UTC(),
					"row_count.int64":         int64(2),
					"uncompressed_size.int64": int64(1024),
				},
				{
					"path.path.utf8":          "path2",
					"section.int64":           int64(2),
					"pointer_kind.int64":      int64(pointers.PointerKindStreamIndex),
					"stream_id.int64":         int64(20),
					"stream_id_ref.int64":     int64(200),
					"min_timestamp.timestamp": unixTime(30).UTC(),
					"max_timestamp.timestamp": unixTime(40).UTC(),
					"row_count.int64":         int64(2),
					"uncompressed_size.int64": int64(2048),
				},
			},
		},
	} {
		t.Run(tt.name, func(t *testing.T) {
			r := pointers.NewReader(pointers.ReaderOptions{
				Columns:   tt.columns,
				Allocator: memory.DefaultAllocator,
				Predicates: []pointers.Predicate{
					pointers.InPredicate{
						Column: streamIDCol,
						Values: []scalar.Scalar{
							scalar.NewInt64Scalar(10),
							scalar.NewInt64Scalar(20),
						},
					},
					pointers.EqualPredicate{
						Column: pointerKindCol,
						Value:  scalar.NewInt64Scalar(int64(pointers.PointerKindStreamIndex)),
					},
				},
			})

			actualTable, err := readTable(context.Background(), r)
			require.NoError(t, err)

			actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
			require.NoError(t, err, "failed to get rows from table")
			require.Equal(t, tt.expected, actual)
		})
	}
}

// TestReaderWithEqualPredicate tests reading with an EqualPredicate.
func TestReaderWithEqualPredicate(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072},
	})

	var (
		pathCol     = sec.Columns()[0]
		sectionCol  = sec.Columns()[1]
		streamIDCol = sec.Columns()[3]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:   []*pointers.Column{pathCol, sectionCol, streamIDCol},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.EqualPredicate{
				Column: streamIDCol,
				Value:  scalar.NewInt64Scalar(20),
			},
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{"path.path.utf8": "path2", "section.int64": int64(2), "stream_id.int64": int64(20)},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithInPredicate tests reading with an InPredicate.
func TestReaderWithInPredicate(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072},
		{Path: "path4", Section: 4, PointerKind: pointers.PointerKindStreamIndex, StreamID: 40, StreamIDRef: 400, StartTs: unixTime(70), EndTs: unixTime(80), LineCount: 20, UncompressedSize: 4096},
	})

	var (
		pathCol     = sec.Columns()[0]
		sectionCol  = sec.Columns()[1]
		streamIDCol = sec.Columns()[3]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:   []*pointers.Column{pathCol, sectionCol, streamIDCol},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.InPredicate{
				Column: streamIDCol,
				Values: []scalar.Scalar{
					scalar.NewInt64Scalar(10),
					scalar.NewInt64Scalar(30),
				},
			},
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)},
		{"path.path.utf8": "path3", "section.int64": int64(3), "stream_id.int64": int64(30)},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithGreaterThanPredicate tests reading with a GreaterThanPredicate.
func TestReaderWithGreaterThanPredicate(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072},
	})

	var (
		pathCol     = sec.Columns()[0]
		sectionCol  = sec.Columns()[1]
		streamIDCol = sec.Columns()[3]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:   []*pointers.Column{pathCol, sectionCol, streamIDCol},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.GreaterThanPredicate{
				Column: streamIDCol,
				Value:  scalar.NewInt64Scalar(15),
			},
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{"path.path.utf8": "path2", "section.int64": int64(2), "stream_id.int64": int64(20)},
		{"path.path.utf8": "path3", "section.int64": int64(3), "stream_id.int64": int64(30)},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithLessThanPredicate tests reading with a LessThanPredicate.
func TestReaderWithLessThanPredicate(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072},
	})

	var (
		pathCol     = sec.Columns()[0]
		sectionCol  = sec.Columns()[1]
		streamIDCol = sec.Columns()[3]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:   []*pointers.Column{pathCol, sectionCol, streamIDCol},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.LessThanPredicate{
				Column: streamIDCol,
				Value:  scalar.NewInt64Scalar(25),
			},
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)},
		{"path.path.utf8": "path2", "section.int64": int64(2), "stream_id.int64": int64(20)},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithTimestampPredicates tests reading with timestamp predicates.
func TestReaderWithTimestampPredicates(t *testing.T) {
	var (
		t10  = unixTime(10)
		t20  = unixTime(20)
		t25  = unixTime(25)
		t25s = scalar.NewTimestampScalar(arrow.Timestamp(t25.UnixNano()), &arrow.TimestampType{Unit: arrow.Nanosecond})
		t30  = unixTime(30)
		t40  = unixTime(40)
		t50  = unixTime(50)
		t55  = unixTime(55)
		t55s = scalar.NewTimestampScalar(arrow.Timestamp(t55.UnixNano()), &arrow.TimestampType{Unit: arrow.Nanosecond})
		t60  = unixTime(60)
	)
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: t10, EndTs: t20, LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: t30, EndTs: t40, LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: t50, EndTs: t60, LineCount: 15, UncompressedSize: 3072},
	})

	var (
		pathCol         = sec.Columns()[0]
		sectionCol      = sec.Columns()[1]
		minTimestampCol = sec.Columns()[5]
		maxTimestampCol = sec.Columns()[6]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:   []*pointers.Column{pathCol, sectionCol, minTimestampCol, maxTimestampCol},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.WhereTimeRangeOverlapsWith(minTimestampCol, maxTimestampCol, t25s, t55s),
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{
			"path.path.utf8":          "path2",
			"section.int64":           int64(2),
			"min_timestamp.timestamp": t30.UTC(),
			"max_timestamp.timestamp": t40.UTC(),
		},
		{
			"path.path.utf8":          "path3",
			"section.int64":           int64(3),
			"min_timestamp.timestamp": t50.UTC(),
			"max_timestamp.timestamp": t60.UTC(),
		},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithFuncPredicate tests reading with a FuncPredicate.
func TestReaderWithFuncPredicate(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072},
	})

	var (
		pathCol    = sec.Columns()[0]
		sectionCol = sec.Columns()[1]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:   []*pointers.Column{pathCol, sectionCol},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.FuncPredicate{
				Column: pathCol,
				Keep: func(_ *pointers.Column, value scalar.Scalar) bool {
					if !value.IsValid() {
						return false
					}

					bb := value.(*scalar.String).Value.Bytes()
					return bytes.Equal(bb, []byte("path1")) || bytes.Equal(bb, []byte("path3"))
				},
			},
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{"path.path.utf8": "path1", "section.int64": int64(1)},
		{"path.path.utf8": "path3", "section.int64": int64(3)},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithAndPredicate tests reading with an AndPredicate.
func TestReaderWithAndPredicate(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072},
	})

	var (
		pathCol     = sec.Columns()[0]
		sectionCol  = sec.Columns()[1]
		streamIDCol = sec.Columns()[3]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:   []*pointers.Column{pathCol, sectionCol, streamIDCol},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.AndPredicate{
				Left: pointers.GreaterThanPredicate{
					Column: streamIDCol,
					Value:  scalar.NewInt64Scalar(5),
				},
				Right: pointers.LessThanPredicate{
					Column: streamIDCol,
					Value:  scalar.NewInt64Scalar(25),
				},
			},
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)},
		{"path.path.utf8": "path2", "section.int64": int64(2), "stream_id.int64": int64(20)},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithOrPredicate tests reading with an OrPredicate.
func TestReaderWithOrPredicate(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072},
	})

	var (
		pathCol     = sec.Columns()[0]
		sectionCol  = sec.Columns()[1]
		streamIDCol = sec.Columns()[3]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:   []*pointers.Column{pathCol, sectionCol, streamIDCol},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.OrPredicate{
				Left: pointers.EqualPredicate{
					Column: streamIDCol,
					Value:  scalar.NewInt64Scalar(10),
				},
				Right: pointers.EqualPredicate{
					Column: streamIDCol,
					Value:  scalar.NewInt64Scalar(30),
				},
			},
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)},
		{"path.path.utf8": "path3", "section.int64": int64(3), "stream_id.int64": int64(30)},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithNotPredicate tests reading with a NotPredicate.
func TestReaderWithNotPredicate(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072},
	})

	var (
		pathCol     = sec.Columns()[0]
		sectionCol  = sec.Columns()[1]
		streamIDCol = sec.Columns()[3]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:   []*pointers.Column{pathCol, sectionCol, streamIDCol},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.NotPredicate{
				Inner: pointers.EqualPredicate{
					Column: streamIDCol,
					Value:  scalar.NewInt64Scalar(20),
				},
			},
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)},
		{"path.path.utf8": "path3", "section.int64": int64(3), "stream_id.int64": int64(30)},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithColumnIndexPointers tests reading column index pointers.
func TestReaderWithColumnIndexPointers(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col1", ColumnIndex: 0, ValuesBloomFilter: []byte{1, 2, 3}},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col2", ColumnIndex: 1, ValuesBloomFilter: []byte{4, 5, 6}},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col3", ColumnIndex: 2, ValuesBloomFilter: []byte{7, 8, 9}},
	})

	cols := sec.Columns()
	require.Len(t, cols, 6, "expected 6 columns for column index pointers")

	var (
		pathCol        = cols[0]
		sectionCol     = cols[1]
		pointerKindCol = cols[2]
		columnNameCol  = cols[3]
		columnIndexCol = cols[4]
		valuesBloomCol = cols[5]
	)

	r := pointers.NewReader(pointers.ReaderOptions{
		Columns: []*pointers.Column{
			pathCol, sectionCol, pointerKindCol, columnNameCol, columnIndexCol, valuesBloomCol,
		},
		Allocator: memory.DefaultAllocator,
		Predicates: []pointers.Predicate{
			pointers.EqualPredicate{
				Column: pointerKindCol,
				Value:  scalar.NewInt64Scalar(int64(pointers.PointerKindColumnIndex)),
			},
		},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{
			"path.path.utf8":                                 "path1",
			"section.int64":                                  int64(1),
			"pointer_kind.int64":                             int64(pointers.PointerKindColumnIndex),
			"column_name.column_name.utf8":                   "col1",
			"column_index.int64":                             int64(0),
			"values_bloom_filter.values_bloom_filter.binary": []byte{1, 2, 3},
		},
		{
			"path.path.utf8":                                 "path2",
			"section.int64":                                  int64(2),
			"pointer_kind.int64":                             int64(pointers.PointerKindColumnIndex),
			"column_name.column_name.utf8":                   "col2",
			"column_index.int64":                             int64(1),
			"values_bloom_filter.values_bloom_filter.binary": []byte{4, 5, 6},
		},
		{
			"path.path.utf8":                                 "path3",
			"section.int64":                                  int64(3),
			"pointer_kind.int64":                             int64(pointers.PointerKindColumnIndex),
			"column_name.column_name.utf8":                   "col3",
			"column_index.int64":                             int64(2),
			"values_bloom_filter.values_bloom_filter.binary": []byte{7, 8, 9},
		},
	}
	require.Equal(t, expected, actual)
}

// TestReaderWithMixedPointers tests reading both stream and column index pointers.
func TestReaderWithMixedPointers(t *testing.T) {
	sec := buildSection(t, []pointers.SectionPointer{
		{Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024},
		{Path: "path2", Section: 2, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col1", ColumnIndex: 0, ValuesBloomFilter: []byte{1, 2, 3}},
		{Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048},
	})

	var (
		pathCol        = sec.Columns()[0]
		sectionCol     = sec.Columns()[1]
		pointerKindCol = sec.Columns()[2]
	)

	// Read all pointer kinds without filtering
	r := pointers.NewReader(pointers.ReaderOptions{
		Columns:    []*pointers.Column{pathCol, sectionCol, pointerKindCol},
		Allocator:  memory.DefaultAllocator,
		Predicates: []pointers.Predicate{},
	})

	actualTable, err := readTable(context.Background(), r)
	require.NoError(t, err)

	actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
	require.NoError(t, err)

	expected := arrowtest.Rows{
		{"path.path.utf8": "path1", "section.int64": int64(1), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex)},
		{"path.path.utf8": "path3", "section.int64": int64(3), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex)},
		{"path.path.utf8": "path2", "section.int64": int64(2), "pointer_kind.int64": int64(pointers.PointerKindColumnIndex)},
	}
	require.Equal(t, expected, actual)
}

func buildSection(t *testing.T, ptrData []pointers.SectionPointer) *pointers.Section {
	t.Helper()

	sectionBuilder := pointers.NewBuilder(nil, 0, 2)

	for _, ptr := range ptrData {
		if ptr.PointerKind == pointers.PointerKindStreamIndex {
			sectionBuilder.ObserveStream(ptr.Path, ptr.Section, ptr.StreamIDRef, ptr.StreamID, ptr.StartTs, ptr.UncompressedSize)
			sectionBuilder.ObserveStream(ptr.Path, ptr.Section, ptr.StreamIDRef, ptr.StreamID, ptr.EndTs, 0)
		} else {
			sectionBuilder.RecordColumnIndex(ptr.Path, ptr.Section, ptr.ColumnName, ptr.ColumnIndex, ptr.ValuesBloomFilter)
		}
	}

	objectBuilder := dataobj.NewBuilder(nil)
	require.NoError(t, objectBuilder.Append(sectionBuilder))

	obj, closer, err := objectBuilder.Flush()
	require.NoError(t, err)
	t.Cleanup(func() { closer.Close() })

	sec, err := pointers.Open(t.Context(), obj.Sections()[0])
	require.NoError(t, err)
	return sec
}

func readTable(ctx context.Context, r *pointers.Reader) (arrow.Table, error) {
	var recs []arrow.RecordBatch

	for {
		rec, err := r.Read(ctx, 128)
		if rec != nil {
			if rec.NumRows() > 0 {
				recs = append(recs, rec)
			}
		}

		if err != nil && errors.Is(err, io.EOF) {
			break
		} else if err != nil {
			return nil, err
		}
	}

	if len(recs) == 0 {
		return nil, io.EOF
	}

	return array.NewTableFromRecords(recs[0].Schema(), recs), nil
}
